package com.example.demo.stream;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.demo.entity.EmployeePo;
import com.example.demo.env.SpringContextHolder;
import com.example.demo.mapper.EmployeePoMapper;
import com.example.demo.stream.deal.CountAggregate;
import com.example.demo.stream.source.DbSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * flink连接mySQL demo
 */
public class DataStreamMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 读mysql数据
        LambdaQueryWrapper<EmployeePo> queryWrapper = Wrappers.lambdaQuery();
//        queryWrapper.eq(EmployeePo::getUserName, "且听风吟");
        DataStream<EmployeePo> dataSource = env.addSource(new DbSource<>(EmployeePoMapper.class, queryWrapper)).returns(EmployeePo.class);
//        dataSource.map(new MapFunction<EmployeePo, EmployeePo>() {
//            @Override
//            public EmployeePo map(EmployeePo value) throws Exception {
//                System.out.println("123123213");
//                return value;
//            }
//        });
//        dataSource.assignTimestampsAndWatermarks(WatermarkStrategy.<EmployeePo>forBoundedOutOfOrderness(Duration.ZERO)
//                .withTimestampAssigner((SerializableTimestampAssigner<EmployeePo>) (employeePo, recordTimestamp) -> System.currentTimeMillis()));
        // 所有数据设置相同的 key
        dataSource.keyBy(data -> true)
                // 事件时间 会话窗口
                .window(EventTimeSessionWindows.withGap(Time.seconds(2)))
                // 聚合操作 统计人数
                .aggregate(new CountAggregate()).print();
        // 关闭ApplicationContext 资源
        SpringContextHolder.close();
        // 执行返回结果
        System.out.println(env.execute("flink job operator"));
    }
}

